package cn.doitedu.rtdw.data_etl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.*;
import org.apache.commons.net.ntp.TimeStamp;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

import java.nio.charset.StandardCharsets;
import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/6
 * @Desc: 学大数据，到多易教育
 * 搜索行为概况分析 轻度聚合 任务
 **/
public class E06_EtlJob_SearchOverviewAgg {
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建逻辑表， 映射 kafka中的 宽表明细数据
        tEnv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(          "
                        + "     user_id           INT,                            "
                        + "     event_id          string,                         "
                        + "     event_time        bigint,                         "
                        + "     properties        map<string,string>              "
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-events-wide',                     "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        // 表转流
        DataStream<SearchEventBean> ds = tEnv.toDataStream(tEnv.from("mall_events_commondim_kfksource"), SearchEventBean.class);

        // 核心逻辑处理
        SingleOutputStreamOperator<ResultBean> resultStream =
                ds.filter(bean -> bean.getEvent_id().equals("search") || bean.getEvent_id().equals("search_return") || bean.getEvent_id().equals("search_click"))
                .keyBy(bean -> bean.getProperties().get("search_id"))
                .process(new KeyedProcessFunction<String, SearchEventBean, ResultBean>() {
                    ValueState<ResultBean> state;
                    CloseableHttpClient client;
                    HttpPost post;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        state = getRuntimeContext().getState(new ValueStateDescriptor<ResultBean>("res_state", ResultBean.class));

                        // 构造http接口请求客户端和请求基本参数
                        client = HttpClientBuilder.create().build();
                        post = new HttpPost("http://doitedu:8081/api/post/simwords");
                        post.addHeader("Content-type", "application/json; charset=utf-8");
                        post.addHeader("Accept", "application/json");
                    }

                    @Override
                    public void processElement(SearchEventBean eventBean, KeyedProcessFunction<String, SearchEventBean, ResultBean>.Context ctx, Collector<ResultBean> out) throws Exception {
                        String eventId = eventBean.getEvent_id();
                        Map<String, String> props = eventBean.getProperties();

                        if ("search".equals(eventId)) {
                            String keyword = props.get("keyword");
                            // 请求算法服务接口 , 设置请求体参数: 搜索关键词
                            post.setEntity(new StringEntity("{\"origin\":\"" + keyword + "\"}", StandardCharsets.UTF_8));

                            // 执行请求
                            CloseableHttpResponse response = client.execute(post);

                            // 获取结果
                            // {"origin":"移动固态硬盘","words":"移动|固态|硬盘|","similarWord":"移动固态硬盘"}
                            String resJson = EntityUtils.toString(response.getEntity(), "utf-8");

                            JSONObject jsonObject = JSON.parseObject(resJson);
                            // 取分词结果
                            String splitWords = jsonObject.getString("words");
                            // 取近义词结果
                            String similarWord = jsonObject.getString("similarWord");

                            // 构造结果bean, 并存入状态
                            ResultBean resultBean =
                                    new ResultBean(eventBean.getUser_id(), eventBean.getEvent_time(),
                                            props.get("search_id"), props.get("keyword"), splitWords, similarWord,
                                            0, 0);
                            state.update(resultBean);

                        } else if ("search_return".equals(eventId)) {
                            int res_cnt = Integer.parseInt(eventBean.getProperties().get("res_cnt"));
                            // 这里有个疑问，直接将state中value()取出的对象修改后，是否就是修改了state中的对象
                            state.value().setReturn_item_count(res_cnt);

                        } else if ("search_click".equals(eventId)) {
                            state.value().setReturn_item_count(0);
                            state.value().setClick_item_count(1);
                        }

                        out.collect(state.value());
                    }
                });

        // 结果流转视图名
        tEnv.createTemporaryView("tmp", resultStream);
        // tEnv.executeSql("select * from tmp").print();


        // 创建逻辑表， 映射 doris中的 搜索概况聚合表
        tEnv.executeSql(
                " create table search_agg_01_doris(    "
                        + "     dt         DATE,         "
                        + "     user_id          INT,          "
                        + "     search_time      timestamp(3), "
                        + "     search_id        VARCHAR(20),  "
                        + "     keyword          VARCHAR(60),  "
                        + "     split_words      VARCHAR(60),  "
                        + "     similar_word     VARCHAR(60),  "
                        + "     return_item_count     INT,     "
                        + "     click_item_count      INT      "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dws.search_ana_agg',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_tl" + System.currentTimeMillis() + "')");


        // 执行逻辑sql，并插入结果
        tEnv.executeSql(
                " INSERT INTO search_agg_01_doris                                             "
                        +" SELECT                                                                      "
                        +"    to_date(date_format(to_timestamp_ltz(search_time,3),'yyyy-MM-dd')) as dt "
                        +"    ,user_id                                                                 "
                        +"    ,to_timestamp_ltz(search_time,3) as search_time                          "
                        +"    ,search_id                                                               "
                        +"    ,keyword                                                                 "
                        +"    ,split_words                                                             "
                        +"    ,similar_word                                                            "
                        +"    ,return_item_count                                                       "
                        +"    ,click_item_count                                                        "
                        +" FROM  tmp                                                                   "
        );


    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class SearchEventBean {
        private int user_id;
        private String event_id;
        private long event_time;
        private Map<String, String> properties;

    }


    @Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
    public static class ResultBean {
        private int user_id;//INT,
        private long search_time;//timestamp(3),
        private String search_id;//VARCHAR(20),
        private String keyword;//VARCHAR(60),
        private String split_words;//VARCHAR(60),
        private String similar_word;//VARCHAR(60),
        private int return_item_count;//INT,
        private int click_item_count;//INT
    }
}
